package sink;

import bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * @Description: TODO QQ1667847363
 * @author: xiao kun tai
 * @date:2021/11/3 16:31
 */
public class Sink1_Kafka {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // kafka 配置项
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.88.106:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStream<String> inputStream = env.addSource( new
                FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));


        DataStream<String> dataStream = inputStream.map(line->{
            System.out.println(line);
            String[] fields = line.split(",");
            return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2])).toString();
        });

        dataStream.addSink(new FlinkKafkaProducer011<String>("192.168.88.106:9092",
                "sinktest",new SimpleStringSchema()));



        env.execute("kafka");

    }
}
